/*
 * Copyright 2013 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.util.concurrent;

import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

  private static final InternalLogger logger = InternalLoggerFactory
      .getInstance(DefaultPromise.class);
  private static final InternalLogger rejectedExecutionLogger =
      InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
  private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
      SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
      AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
  private static final Object SUCCESS = new Object();
  private static final Object UNCANCELLABLE = new Object();
  private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(
      ThrowableUtil.unknownStackTrace(
          new CancellationException(), DefaultPromise.class, "cancel(...)"));

  private volatile Object result;
  private final EventExecutor executor;
  /**
   * One or more listeners. Can be a {@link GenericFutureListener} or a {@link
   * DefaultFutureListeners}. If {@code null}, it means either 1) no listeners were added yet or 2)
   * all listeners were notified.
   *
   * Threading - synchronized(this). We must support adding listeners when there is no
   * EventExecutor.
   */
  private Object listeners;
  /**
   * Threading - synchronized(this). We are required to hold the monitor to use Java's underlying
   * wait()/notifyAll().
   */
  private short waiters;

  /**
   * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener
   * notification if the executor changes.
   */
  private boolean notifyingListeners;

  /**
   * Creates a new instance.
   *
   * It is preferable to use {@link EventExecutor#newPromise()} to create a new promise
   *
   * @param executor the {@link EventExecutor} which is used to notify the promise once it is
   * complete. It is assumed this executor will protect against {@link StackOverflowError}
   * exceptions. The executor may be used to avoid {@link StackOverflowError} by executing a {@link
   * Runnable} if the stack depth exceeds a threshold.
   */
  public DefaultPromise(EventExecutor executor) {
    this.executor = checkNotNull(executor, "executor");
  }

  /**
   * See {@link #executor()} for expectations of the executor.
   */
  protected DefaultPromise() {
    // only for subclasses
    executor = null;
  }

  @Override
  public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
      return this;
    }
    throw new IllegalStateException("complete already: " + this);
  }

  @Override
  public boolean trySuccess(V result) {
    return setSuccess0(result);
  }

  @Override
  public Promise<V> setFailure(Throwable cause) {
    if (setFailure0(cause)) {
      return this;
    }
    throw new IllegalStateException("complete already: " + this, cause);
  }

  @Override
  public boolean tryFailure(Throwable cause) {
    return setFailure0(cause);
  }

  @Override
  public boolean setUncancellable() {
    if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
      return true;
    }
    Object result = this.result;
    return !isDone0(result) || !isCancelled0(result);
  }

  @Override
  public boolean isSuccess() {
    Object result = this.result;
    return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
  }

  @Override
  public boolean isCancellable() {
    return result == null;
  }

  @Override
  public Throwable cause() {
    Object result = this.result;
    return (result instanceof CauseHolder) ? ((CauseHolder) result).cause : null;
  }

  @Override
  public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");

    synchronized (this) {
      addListener0(listener);
    }

    if (isDone()) {
      notifyListeners();
    }

    return this;
  }

  @Override
  public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
    checkNotNull(listeners, "listeners");

    synchronized (this) {
      for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
        if (listener == null) {
          break;
        }
        addListener0(listener);
      }
    }

    if (isDone()) {
      notifyListeners();
    }

    return this;
  }

  @Override
  public Promise<V> removeListener(
      final GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");

    synchronized (this) {
      removeListener0(listener);
    }

    return this;
  }

  @Override
  public Promise<V> removeListeners(
      final GenericFutureListener<? extends Future<? super V>>... listeners) {
    checkNotNull(listeners, "listeners");

    synchronized (this) {
      for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
        if (listener == null) {
          break;
        }
        removeListener0(listener);
      }
    }

    return this;
  }

  @Override
  public Promise<V> await() throws InterruptedException {
    if (isDone()) {
      return this;
    }

    if (Thread.interrupted()) {
      throw new InterruptedException(toString());
    }

    checkDeadLock();

    synchronized (this) {
      while (!isDone()) {
        incWaiters();
        try {
          wait();
        } finally {
          decWaiters();
        }
      }
    }
    return this;
  }

  @Override
  public Promise<V> awaitUninterruptibly() {
    if (isDone()) {
      return this;
    }

    checkDeadLock();

    boolean interrupted = false;
    synchronized (this) {
      while (!isDone()) {
        incWaiters();
        try {
          wait();
        } catch (InterruptedException e) {
          // Interrupted while waiting.
          interrupted = true;
        } finally {
          decWaiters();
        }
      }
    }

    if (interrupted) {
      Thread.currentThread().interrupt();
    }

    return this;
  }

  @Override
  public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return await0(unit.toNanos(timeout), true);
  }

  @Override
  public boolean await(long timeoutMillis) throws InterruptedException {
    return await0(MILLISECONDS.toNanos(timeoutMillis), true);
  }

  @Override
  public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
    try {
      return await0(unit.toNanos(timeout), false);
    } catch (InterruptedException e) {
      // Should not be raised at all.
      throw new InternalError();
    }
  }

  @Override
  public boolean awaitUninterruptibly(long timeoutMillis) {
    try {
      return await0(MILLISECONDS.toNanos(timeoutMillis), false);
    } catch (InterruptedException e) {
      // Should not be raised at all.
      throw new InternalError();
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  public V getNow() {
    Object result = this.result;
    if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
      return null;
    }
    return (V) result;
  }

  /**
   * {@inheritDoc}
   *
   * @param mayInterruptIfRunning this value has no effect in this implementation.
   */
  @Override
  public boolean cancel(boolean mayInterruptIfRunning) {
    if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
      if (checkNotifyWaiters()) {
        notifyListeners();
      }
      return true;
    }
    return false;
  }

  @Override
  public boolean isCancelled() {
    return isCancelled0(result);
  }

  @Override
  public boolean isDone() {
    return isDone0(result);
  }

  @Override
  public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed();
    return this;
  }

  @Override
  public Promise<V> syncUninterruptibly() {
    awaitUninterruptibly();
    rethrowIfFailed();
    return this;
  }

  @Override
  public String toString() {
    return toStringBuilder().toString();
  }

  protected StringBuilder toStringBuilder() {
    StringBuilder buf = new StringBuilder(64)
        .append(StringUtil.simpleClassName(this))
        .append('@')
        .append(Integer.toHexString(hashCode()));

    Object result = this.result;
    if (result == SUCCESS) {
      buf.append("(success)");
    } else if (result == UNCANCELLABLE) {
      buf.append("(uncancellable)");
    } else if (result instanceof CauseHolder) {
      buf.append("(failure: ")
          .append(((CauseHolder) result).cause)
          .append(')');
    } else if (result != null) {
      buf.append("(success: ")
          .append(result)
          .append(')');
    } else {
      buf.append("(incomplete)");
    }

    return buf;
  }

  /**
   * Get the executor used to notify listeners when this promise is complete.
   * <p>
   * It is assumed this executor will protect against {@link StackOverflowError} exceptions. The
   * executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the
   * stack depth exceeds a threshold.
   *
   * @return The executor used to notify listeners when this promise is complete.
   */
  protected EventExecutor executor() {
    return executor;
  }

  protected void checkDeadLock() {
    EventExecutor e = executor();
    if (e != null && e.inEventLoop()) {
      throw new BlockingOperationException(toString());
    }
  }

  /**
   * Notify a listener that a future has completed.
   * <p>
   * This method has a fixed depth of {@link #MAX_LISTENER_STACK_DEPTH} that will limit recursion to
   * prevent {@link StackOverflowError} and will stop notifying listeners added after this threshold
   * is exceeded.
   *
   * @param eventExecutor the executor to use to notify the listener {@code listener}.
   * @param future the future that is complete.
   * @param listener the listener to notify.
   */
  protected static void notifyListener(
      EventExecutor eventExecutor, final Future<?> future,
      final GenericFutureListener<?> listener) {
    checkNotNull(eventExecutor, "eventExecutor");
    checkNotNull(future, "future");
    checkNotNull(listener, "listener");
    notifyListenerWithStackOverFlowProtection(eventExecutor, future, listener);
  }

  private void notifyListeners() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
      final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
      final int stackDepth = threadLocals.futureListenerStackDepth();
      if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
        threadLocals.setFutureListenerStackDepth(stackDepth + 1);
        try {
          notifyListenersNow();
        } finally {
          threadLocals.setFutureListenerStackDepth(stackDepth);
        }
        return;
      }
    }

    safeExecute(executor, new Runnable() {
      @Override
      public void run() {
        notifyListenersNow();
      }
    });
  }

  /**
   * The logic in this method should be identical to {@link #notifyListeners()} but cannot share
   * code because the listener(s) cannot be cached for an instance of {@link DefaultPromise} since
   * the listener(s) may be changed and is protected by a synchronized operation.
   */
  private static void notifyListenerWithStackOverFlowProtection(final EventExecutor executor,
      final Future<?> future,
      final GenericFutureListener<?> listener) {
    if (executor.inEventLoop()) {
      final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
      final int stackDepth = threadLocals.futureListenerStackDepth();
      if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
        threadLocals.setFutureListenerStackDepth(stackDepth + 1);
        try {
          notifyListener0(future, listener);
        } finally {
          threadLocals.setFutureListenerStackDepth(stackDepth);
        }
        return;
      }
    }

    safeExecute(executor, new Runnable() {
      @Override
      public void run() {
        notifyListener0(future, listener);
      }
    });
  }

  private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
      // Only proceed if there are listeners to notify and we are not already notifying listeners.
      if (notifyingListeners || this.listeners == null) {
        return;
      }
      notifyingListeners = true;
      listeners = this.listeners;
      this.listeners = null;
    }
    for (; ; ) {
      if (listeners instanceof DefaultFutureListeners) {
        notifyListeners0((DefaultFutureListeners) listeners);
      } else {
        notifyListener0(this, (GenericFutureListener<?>) listeners);
      }
      synchronized (this) {
        if (this.listeners == null) {
          // Nothing can throw from within this method, so setting notifyingListeners back to false does not
          // need to be in a finally block.
          notifyingListeners = false;
          return;
        }
        listeners = this.listeners;
        this.listeners = null;
      }
    }
  }

  private void notifyListeners0(DefaultFutureListeners listeners) {
    GenericFutureListener<?>[] a = listeners.listeners();
    int size = listeners.size();
    for (int i = 0; i < size; i++) {
      notifyListener0(this, a[i]);
    }
  }

  @SuppressWarnings({"unchecked", "rawtypes"})
  private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
      l.operationComplete(future);
    } catch (Throwable t) {
      if (logger.isWarnEnabled()) {
        logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()",
            t);
      }
    }
  }

  private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {
      listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
      ((DefaultFutureListeners) listeners).add(listener);
    } else {
      listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
  }

  private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners instanceof DefaultFutureListeners) {
      ((DefaultFutureListeners) listeners).remove(listener);
    } else if (listeners == listener) {
      listeners = null;
    }
  }

  private boolean setSuccess0(V result) {
    return setValue0(result == null ? SUCCESS : result);
  }

  private boolean setFailure0(Throwable cause) {
    return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
  }

  private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
      if (checkNotifyWaiters()) {
        notifyListeners();
      }
      return true;
    }
    return false;
  }

  /**
   * Check if there are any waiters and if so notify these.
   *
   * @return {@code true} if there are any listeners attached to the promise, {@code false}
   * otherwise.
   */
  private synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
      notifyAll();
    }
    return listeners != null;
  }

  private void incWaiters() {
    if (waiters == Short.MAX_VALUE) {
      throw new IllegalStateException("too many waiters: " + this);
    }
    ++waiters;
  }

  private void decWaiters() {
    --waiters;
  }

  private void rethrowIfFailed() {
    Throwable cause = cause();
    if (cause == null) {
      return;
    }

    PlatformDependent.throwException(cause);
  }

  private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
    if (isDone()) {
      return true;
    }

    if (timeoutNanos <= 0) {
      return isDone();
    }

    if (interruptable && Thread.interrupted()) {
      throw new InterruptedException(toString());
    }

    checkDeadLock();

    long startTime = System.nanoTime();
    long waitTime = timeoutNanos;
    boolean interrupted = false;
    try {
      for (; ; ) {
        synchronized (this) {
          if (isDone()) {
            return true;
          }
          incWaiters();
          try {
            wait(waitTime / 1000000, (int) (waitTime % 1000000));
          } catch (InterruptedException e) {
            if (interruptable) {
              throw e;
            } else {
              interrupted = true;
            }
          } finally {
            decWaiters();
          }
        }
        if (isDone()) {
          return true;
        } else {
          waitTime = timeoutNanos - (System.nanoTime() - startTime);
          if (waitTime <= 0) {
            return isDone();
          }
        }
      }
    } finally {
      if (interrupted) {
        Thread.currentThread().interrupt();
      }
    }
  }

  /**
   * Notify all progressive listeners.
   * <p>
   * No attempt is made to ensure notification order if multiple calls are made to this method
   * before the original invocation completes.
   * <p>
   * This will do an iteration over all listeners to get all of type {@link
   * GenericProgressiveFutureListener}s.
   *
   * @param progress the new progress.
   * @param total the total progress.
   */
  @SuppressWarnings("unchecked")
  void notifyProgressiveListeners(final long progress, final long total) {
    final Object listeners = progressiveListeners();
    if (listeners == null) {
      return;
    }

    final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;

    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
      if (listeners instanceof GenericProgressiveFutureListener[]) {
        notifyProgressiveListeners0(
            self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
      } else {
        notifyProgressiveListener0(
            self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress,
            total);
      }
    } else {
      if (listeners instanceof GenericProgressiveFutureListener[]) {
        final GenericProgressiveFutureListener<?>[] array =
            (GenericProgressiveFutureListener<?>[]) listeners;
        safeExecute(executor, new Runnable() {
          @Override
          public void run() {
            notifyProgressiveListeners0(self, array, progress, total);
          }
        });
      } else {
        final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
            (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
        safeExecute(executor, new Runnable() {
          @Override
          public void run() {
            notifyProgressiveListener0(self, l, progress, total);
          }
        });
      }
    }
  }

  /**
   * Returns a {@link GenericProgressiveFutureListener}, an array of {@link
   * GenericProgressiveFutureListener}, or {@code null}.
   */
  private synchronized Object progressiveListeners() {
    Object listeners = this.listeners;
    if (listeners == null) {
      // No listeners added
      return null;
    }

    if (listeners instanceof DefaultFutureListeners) {
      // Copy DefaultFutureListeners into an array of listeners.
      DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
      int progressiveSize = dfl.progressiveSize();
      switch (progressiveSize) {
        case 0:
          return null;
        case 1:
          for (GenericFutureListener<?> l : dfl.listeners()) {
            if (l instanceof GenericProgressiveFutureListener) {
              return l;
            }
          }
          return null;
      }

      GenericFutureListener<?>[] array = dfl.listeners();
      GenericProgressiveFutureListener<?>[] copy = new GenericProgressiveFutureListener[progressiveSize];
      for (int i = 0, j = 0; j < progressiveSize; i++) {
        GenericFutureListener<?> l = array[i];
        if (l instanceof GenericProgressiveFutureListener) {
          copy[j++] = (GenericProgressiveFutureListener<?>) l;
        }
      }

      return copy;
    } else if (listeners instanceof GenericProgressiveFutureListener) {
      return listeners;
    } else {
      // Only one listener was added and it's not a progressive listener.
      return null;
    }
  }

  private static void notifyProgressiveListeners0(
      ProgressiveFuture<?> future, GenericProgressiveFutureListener<?>[] listeners, long progress,
      long total) {
    for (GenericProgressiveFutureListener<?> l : listeners) {
      if (l == null) {
        break;
      }
      notifyProgressiveListener0(future, l, progress, total);
    }
  }

  @SuppressWarnings({"unchecked", "rawtypes"})
  private static void notifyProgressiveListener0(
      ProgressiveFuture future, GenericProgressiveFutureListener l, long progress, long total) {
    try {
      l.operationProgressed(future, progress, total);
    } catch (Throwable t) {
      if (logger.isWarnEnabled()) {
        logger
            .warn("An exception was thrown by " + l.getClass().getName() + ".operationProgressed()",
                t);
      }
    }
  }

  private static boolean isCancelled0(Object result) {
    return result instanceof CauseHolder
        && ((CauseHolder) result).cause instanceof CancellationException;
  }

  private static boolean isDone0(Object result) {
    return result != null && result != UNCANCELLABLE;
  }

  private static final class CauseHolder {

    final Throwable cause;

    CauseHolder(Throwable cause) {
      this.cause = cause;
    }
  }

  private static void safeExecute(EventExecutor executor, Runnable task) {
    try {
      executor.execute(task);
    } catch (Throwable t) {
      rejectedExecutionLogger
          .error("Failed to submit a listener notification task. Event loop shut down?", t);
    }
  }
}
